package com.xxl.job.core.executor;

import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.handler.impl.MethodJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.server.EmbedServer;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * @author xuxueli
 * @since 2016/3/2
 */
public class XxlJobExecutor {

	private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);

	private String adminAddresses;

	private String accessToken;

	private String appname;

	private String address;

	private String ip;

	private int port;

	private String logPath;

	private int logRetentionDays;

	public void setAdminAddresses(String adminAddresses) {
		this.adminAddresses = adminAddresses;
	}

	public void setAccessToken(String accessToken) {
		this.accessToken = accessToken;
	}

	public void setAppname(String appname) {
		this.appname = appname;
	}

	public void setAddress(String address) {
		this.address = address;
	}

	public void setIp(String ip) {
		this.ip = ip;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public void setLogPath(String logPath) {
		this.logPath = logPath;
	}

	public void setLogRetentionDays(int logRetentionDays) {
		this.logRetentionDays = logRetentionDays;
	}

	// ---------------------- start + stop ----------------------
	public void start() throws Exception {

		// init logpath
		XxlJobFileAppender.initLogPath(logPath);

		// init invoker, admin-client
		initAdminBizList(adminAddresses, accessToken);

		// init JobLogFileCleanThread
		JobLogFileCleanThread.getInstance().start(logRetentionDays);

		// init TriggerCallbackThread
		TriggerCallbackThread.getInstance().start();

		// init executor-server
		initEmbedServer(address, ip, port, appname, accessToken);
	}

	public void destroy() {
		// destroy executor-server
		stopEmbedServer();

		// destroy jobThreadRepository
		if (!jobThreadRepository.isEmpty()) {
			for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) {
				JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
				// wait for job thread push result to callback queue
				if (oldJobThread != null) {
					try {
						oldJobThread.join();
					}
					catch (InterruptedException e) {
						logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
					}
				}
			}
			jobThreadRepository.clear();
		}
		jobHandlerRepository.clear();

		// destroy JobLogFileCleanThread
		JobLogFileCleanThread.getInstance().toStop();

		// destroy TriggerCallbackThread
		TriggerCallbackThread.getInstance().toStop();

	}

	// ---------------------- admin-client (rpc invoker) ----------------------
	private static List<AdminBiz> adminBizList;

	private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
		if (adminAddresses != null && !adminAddresses.trim().isEmpty()) {
			for (String address : adminAddresses.trim().split(",")) {
				if (address != null && !address.trim().isEmpty()) {

					AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

					if (adminBizList == null) {
						adminBizList = new ArrayList<>();
					}
					adminBizList.add(adminBiz);
				}
			}
		}
	}

	public static List<AdminBiz> getAdminBizList() {
		return adminBizList;
	}

	// ---------------------- executor-server (rpc provider) ----------------------
	private EmbedServer embedServer = null;

	private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) {

		// fill ip port
		port = port > 0 ? port : NetUtil.findAvailablePort(9999);
		ip = (ip != null && !ip.trim().isEmpty()) ? ip : IpUtil.getIp();

		// generate address
		if (address == null || address.trim().isEmpty()) {
			String ip_port_address = IpUtil.getIpPort(ip, port);

			/*
			 * registry-address：default use address to registry , otherwise use ip:port if
			 * address is null
			 */
			address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
		}

		// accessToken
		if (accessToken == null || accessToken.trim().isEmpty()) {
			logger.warn("xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
		}

		// start
		embedServer = new EmbedServer();
		embedServer.start(address, port, appname, accessToken);
	}

	private void stopEmbedServer() {
		// stop provider factory
		if (embedServer != null) {
			try {
				embedServer.stop();
			}
			catch (Exception e) {
				logger.error(e.getMessage(), e);
			}
		}
	}

	// ---------------------- job handler repository ----------------------
	private static final ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<>();

	public static IJobHandler loadJobHandler(String name) {
		return jobHandlerRepository.get(name);
	}

	public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
		logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
		return jobHandlerRepository.put(name, jobHandler);
	}

	protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod) {
		if (xxlJob == null) {
			return;
		}

		String name = xxlJob.value();
		// make and simplify the variables since they'll be called several times later
		Class<?> clazz = bean.getClass();
		String methodName = executeMethod.getName();
		if (name.trim().isEmpty()) {
			throw new RuntimeException(
					"xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
		}
		if (loadJobHandler(name) != null) {
			throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
		}

		executeMethod.setAccessible(true);

		// init and destroy
		Method initMethod = null;
		Method destroyMethod = null;

		if (!xxlJob.init().trim().isEmpty()) {
			try {
				initMethod = clazz.getDeclaredMethod(xxlJob.init());
				initMethod.setAccessible(true);
			}
			catch (NoSuchMethodException e) {
				throw new RuntimeException(
						"xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
			}
		}
		if (!xxlJob.destroy().trim().isEmpty()) {
			try {
				destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
				destroyMethod.setAccessible(true);
			}
			catch (NoSuchMethodException e) {
				throw new RuntimeException(
						"xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
			}
		}

		// registry jobhandler
		registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

	}

	// ---------------------- job thread repository ----------------------
	private static final ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<>();

	public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
		JobThread newJobThread = new JobThread(jobId, handler);
		newJobThread.start();
		logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}",
				new Object[] { jobId, handler });

		JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);
		/*
		 * putIfAbsent | oh my god, map's put method return the old value!!!
		 */
		if (oldJobThread != null) {
			oldJobThread.toStop(removeOldReason);
			oldJobThread.interrupt();
		}

		return newJobThread;
	}

	public static JobThread removeJobThread(int jobId, String removeOldReason) {
		JobThread oldJobThread = jobThreadRepository.remove(jobId);
		if (oldJobThread != null) {
			oldJobThread.toStop(removeOldReason);
			oldJobThread.interrupt();

			return oldJobThread;
		}
		return null;
	}

	public static JobThread loadJobThread(int jobId) {
		return jobThreadRepository.get(jobId);
	}

}
